package cn.mrcode.wxsdk.core.dialogue.common.distributed;

import cn.mrcode.wxsdk.core.dialogue.common.log.LogTemplateUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 分布式系统中，自动维护 选举单节点 任务更新 的 抽象类
 * @author zhuqiang
 * @version V1.0
 * @date 2015/9/23 10:48
 */
@Deprecated
public abstract class ZkDistributedSingleNodeExecutor implements Watcher {
    private static Logger log = LoggerFactory.getLogger(ZkDistributedSingleNodeExecutor.class);
    private int sessionTimeout;
    private String zkServiceList;
    private String rootPath;
    private ZooKeeper zkClient = null;
    private String ip;
    private String thisPath;
    private CountDownLatch cdl = new CountDownLatch(1);
    private CountDownLatch cdlIp = new CountDownLatch(1);
    private boolean isRunTask = false;  //记录任务是否已被开启
    private Object obj;

    /**
     *  构建 执行器
     * @param sessionTimeout 链接超时毫秒
     * @param zkServiceList zk服务器地址，逗号隔开
     * @param rootPath 根目录 路径（不存在将被自动创建，最好存在） 例如： /wxBaseToken
     * @param obj 业务参数，解决某些场景下：第一次推送数据的时候，由于构造块里面的代码还没有执行完成  ，而第一次推送数据中又需要用到该参数，该参数会在首次推送数据里面回调传递
     */
    public ZkDistributedSingleNodeExecutor(int sessionTimeout, String zkServiceList,String rootPath,Object obj) {
        this.sessionTimeout = sessionTimeout;
        this.zkServiceList = zkServiceList;
        this.rootPath = rootPath;
        this.obj = obj;

        if(sessionTimeout <= 0 || zkServiceList == null || rootPath == null){
            throw  new IllegalArgumentException("所有构造参数都不能为空，int 类型不能小于0");
        }

       /* init();*/
    }

    public void init() {
        try {
            String ip = InetAddress.getLocalHost().getHostAddress();
           /* if(StringUtils.isBlank(id)){
                String errMsg = "初始化 " + this.getClass().getName() + " 时，未获取到ip地址，请重试！";
                log.e(errMsg);
                throw new RuntimeException(errMsg);
            }*/
            this.ip = ip + "_";
            zkClient = ZkClientUtil.createConnection(zkServiceList, sessionTimeout, this);
            cdl.await();
        } catch (UnknownHostException e) {
            String errMsg = "获取ip地址错误：" + e.getMessage();
            log.error(errMsg);
            e.printStackTrace();
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        isExistsAndCreateRootPath();
        //链接成功后，创建子目录（临时目录序列号：目录规则：ip_序列号）
        String thisPath = ZkClientUtil.createPath(zkClient, this.rootPath + "/" + ip, "child", CreateMode.EPHEMERAL_SEQUENTIAL);
        if (thisPath != null) {
            this.thisPath = thisPath;
            log.info("[" + thisPath + "]：已在zk上注册");
            cdlIp.countDown();
        }else{
            String errMsg = rootPath + ":初始化zk客户端服务失败";
            log.error(errMsg);
            throw  new RuntimeException(errMsg);
        }
    }

    private void isExistsAndCreateRootPath() {
        try {
            boolean exists = ZkClientUtil.exists(zkClient, this.rootPath);
            if(!exists){
                JSONObject obj = new JSONObject();
                obj.put("info","true");
                ZkClientUtil.createPath(zkClient, this.rootPath,obj.toJSONString(), CreateMode.PERSISTENT);
            }
        }catch (RuntimeException e){
            log.error("创建：根目录：" + rootPath + "出现了一个致命的错误，如果您的项目是第一次使用该zookpeer服务器时启动，我们建议您，重新再次启动一次项目 或则 确保 zookpeer上存在该目录");
            e.printStackTrace();
        }
    }

    /**
     * 用与 监听 链接成功，和 根目录 数据改变 需要做的操作
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        String apiName = "监听数据";
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {  //应该是同步状态下？
            if(Event.EventType.None == watchedEvent.getType()){
                log.info(LogTemplateUtil.svMsg(apiName,"%s;id=%s;链接zk服务成功！状态=%s",getRootPath(),getIp(),watchedEvent.getState()+""));
                cdl.countDown();
                isExistsAndCreateRootPath();
                String data = ZkClientUtil.readData(zkClient, rootPath);
                firstRefresh(data,obj);
                this.handerList();
            }else if(Event.EventType.NodeDataChanged == watchedEvent.getType()){  //NodeDataChanged 该节点下的数据被改变
                log.info(LogTemplateUtil.svMsg(apiName,"%s;id=%s;监听到数据或节点改变",getRootPath(),getThisPath()));
                refresh(ZkClientUtil.readData(zkClient, watchedEvent.getPath()));
            }

            try {
                zkClient.exists(rootPath, this);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    class ListWatcher implements Watcher{

        @Override
        public void process(WatchedEvent watchedEvent) {
            if(Event.EventType.NodeChildrenChanged == watchedEvent.getType()){  //NodeChildrenChanged 子节点列表改变
                log.info("[" + thisPath + "]子节点列表改变了:" + rootPath);
                handerList();
            }
        }
    }
    private ListWatcher listWatcher = new ListWatcher();
    /**
     * 子节点列表改变，需要判断 自己是否需要启动更新任务。
     */

    private void handerList(){
        String apiName = "处理节点信息改变";
        try {
            if(thisPath == null){
                cdlIp.await();
            }
            zkClient.exists(rootPath,this);
            List<String> list = zkClient.getChildren(rootPath, listWatcher);
            if(list == null || list.size() <= 0){
                return;
            }
            String[] sequentials = new String[list.size()];
            for (int i1 = 0; i1 < list.size(); i1++) {
                String[] split = list.get(i1).split("_");
                sequentials[i1] = split[1];
            }

            Arrays.sort(sequentials);

            String minSequential = sequentials[0];
            String[] split = thisPath.split("_");
            String thisSequential = split[1];
            log.info(LogTemplateUtil.svMsg(apiName,"最小序列=%s；节点数量=%s;所有节点名称列表=%s",minSequential,list.size()+"",JSONObject.toJSONString(list)));
            if (thisSequential.equals(minSequential)) { //如果自己是最小的序列，则执行更新操作
                //如果该自己做 leader 了，要先判断任务是否已经开启过了
                if(!isRunTask){
                    log.info(LogTemplateUtil.svMsg(apiName,"本机是最小序列，全局更新任务已被本机接管(%s)",thisPath));
                    executor(this,ZkClientUtil.readData(zkClient, rootPath));
                    isRunTask = true;
                }
            }
        } catch (KeeperException e) {
            log.error(LogTemplateUtil.svMsg(apiName,"出错了=%s",JSONObject.toJSONString(e)));
        } catch (InterruptedException e) {
            log.error(LogTemplateUtil.svMsg(apiName,"出错了=%s",JSONObject.toJSONString(e)));
        }
    }

    /** 往 zk 上更新数据 */
    public void writeData(String data){
        ZkClientUtil.writeData(zkClient, rootPath,data);
    }

    /**
     * 需要执行任务的回调方法， 该方法，只会被回调一次，所以请 在该方法内保证你的任务 会被正常启动
     * @param zkDistributedSingleNodeExecutor
     * @param data 该自己执行任务的时候，会获取一次数据推送给本方法
     */
    protected abstract void executor(ZkDistributedSingleNodeExecutor zkDistributedSingleNodeExecutor,String data);

    /**
     * 链接到zk 的时候 第一次取得数据，有可能为 创建父目录写入的固定数据.
     * 因为在继承本类的时候，要先初始化 父类，那么这个时候还未初始化完成，第一次数据将会被推送。
     * 而 自定义的业务参数，在构造器中传入的。还没有被赋值，所以不能在该方法中使用 构造传入的值
     * 由于业务实现问题：第一次接收到的数据，或许是其他机器上传的，所以应该被解析更新到本地。
     * （这是一个坑，因为微信的jssdk授权的ticket，需要依赖 基础支持中的token，如果在这里不更新，那么就会出现获取不到token的情况）
     * @param data
     * @param obj : 该参数就是解决上面所描述的问题。
     */
    protected abstract void firstRefresh(String data,Object obj);
    /**
     * 刷新数据，每次有数据改变的时候，将会通过次方法 推送，数据一般为json类型，（具体是什么结果，和你在任务中写入的结构一致）
     * @param data
     */
    protected abstract void refresh(String data);

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public String getZkServiceList() {
        return zkServiceList;
    }

    public String getRootPath() {
        return rootPath;
    }

    public String getIp() {
        return ip;
    }

    public String getThisPath() {
        return thisPath;
    }

    public boolean isRunTask() {
        return isRunTask;
    }

    public Object getObj() {
        return obj;
    }
}


